package com.om.opensourway;

import Utils.HttpClientUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Base64;
import java.util.Map;
import java.util.Properties;

/**
 * 打印从 Kafka 中获取的数据
 */
public class giteeBolt extends BaseRichBolt {
    static Properties conf = new Properties();
    public static CloseableHttpClient httpClient;
    static Logger logger;

    static {
        try {
            logger = org.apache.log4j.Logger.getLogger("STDERR");
            httpClient = HttpClientUtils.getClient();
            InputStream resourceAsStream = AnalysisForStreaming.class.getResourceAsStream("/conf.properties");
            conf.load(resourceAsStream);
        }
         catch (IOException e) {
            e.printStackTrace();
        }
    }

    URI uri = new URIBuilder().setScheme(conf.getProperty("es.scheme")).setHost(conf.getProperty("es.host")).setPort(Integer.parseInt(conf.getProperty("es.port"))).setPath("/_bulk").build();

    private OutputCollector collector;

    public giteeBolt() throws URISyntaxException {
    }


    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }


    public void execute(Tuple input) {
        try {
            String value = input.getStringByField("value");
            CloseableHttpResponse execute = null;
            try {
                HttpPost httpPost = new HttpPost(uri);
                httpPost.setEntity(new StringEntity(value.toString(), "UTF-8"));
                httpPost.addHeader(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
                httpPost.addHeader("Authorization", "Basic " + Base64.getEncoder().encodeToString((conf.getProperty("es.user") + ":" + conf.get("es.passwd")).getBytes()));
                execute = httpClient.execute(httpPost);
            } catch (Exception e) {
                logger.error(e.getMessage());
                logger.error(EntityUtils.toString(execute.getEntity()), e);
            } finally {
                if (execute != null) {
                    EntityUtils.consume(execute.getEntity());
                }
            }
            collector.ack(input);
        } catch (Exception e) {
            collector.fail(input);
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}
